package com.isyscore.os.metadata.service.executor;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.isyscore.boot.login.LoginUserManager;
import com.isyscore.boot.login.dto.ApiRequest;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.model.entity.DataFlowDefinition;
import com.isyscore.os.core.model.entity.DataFlowTask;
import com.isyscore.os.core.util.ApplicationUtils;
import com.isyscore.os.core.util.InitiallyUtils;
import com.isyscore.os.metadata.controller.OpenApiController;
import com.isyscore.os.metadata.database.AbstractDatabase;
import com.isyscore.os.metadata.enums.*;
import com.isyscore.os.metadata.kettle.base.TransPreview;
import com.isyscore.os.metadata.manager.DatabaseManager;
import com.isyscore.os.metadata.model.dto.DataSourceDTO;
import com.isyscore.os.metadata.model.vo.ResultVO;
import com.isyscore.os.metadata.service.DataFlowDefinitionService;
import com.isyscore.os.metadata.service.DataFlowTaskService;
import com.isyscore.os.metadata.service.DataSourceService;
import com.isyscore.os.metadata.service.impl.DataSourceServiceImpl;
import com.isyscore.os.metadata.utils.CronUtils;
import com.isyscore.os.metadata.utils.IncrSqlBuilder;
import com.isyscore.os.metadata.utils.TimeUtil;
import com.isyscore.os.permission.entity.LoginVO;
import groovy.lang.Tuple2;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.logging.KettleLogLayout;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.KettleLoggingEvent;
import org.pentaho.di.core.logging.LoggingRegistry;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransListener;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepStatus;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.isyscore.os.core.exception.ErrorCode.BOOT_FAILED;
import static com.isyscore.os.core.exception.ErrorCode.JOB_IS_RUNNING;

@Slf4j
public class FlowExecutor extends BaseExecutor {
    private static final DataFlowTaskService dataFlowTaskService = ApplicationUtils.getBean(DataFlowTaskService.class);
    private static final DatabaseManager databaseManager = ApplicationUtils.getBean(DatabaseManager.class);
    private static final DataSourceService dataSourceService = ApplicationUtils.getBean(DataSourceServiceImpl.class);
    private static final DataFlowDefinitionService dataFlowDefinitionService = ApplicationUtils.getBean(DataFlowDefinitionService.class);
    private static final Map<String, Integer> retryMap = new ConcurrentHashMap();
    private static final DelayQueue<DelayTask<FlowExecutor>> delayReTryQueue = new DelayQueue<>();
    /**
     * 缓存运行中的流程实例
     */
    private static final Cache<String, FlowExecutor> executors = CacheBuilder.newBuilder()
            .maximumSize(100)
            .expireAfterAccess(10, TimeUnit.SECONDS)
            .build();

    private static final Set<String> runningFlow = Collections.synchronizedSet(new HashSet<>());

    /**
     * 缓存预览数据
     */
    public static final Cache<String, TransPreview> previewData = CacheBuilder.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(1, TimeUnit.MINUTES)
            .build();
    public static final int MAX_RETRY_TIMES = 3;

    /**
     * 消费重试任务
     */
    static {
        startDelayedTaskProcessor();
    }


    private volatile String executionId;
    private final TransExecutionConfiguration executionConfiguration;
    private final TransMeta transMeta;
    private final String definitionId;
    private Trans trans;
    private final KettleJobTriggerType triggerType;
    private final OpenApiController.CallBack callBack;
    private final DataFlowDefinition definition;
    private final RetryIntervalGenerator retryIntervalGenerator;
    private boolean noError =true;
    private FlowExecutor(Map<String, String> arg, TransMeta transMeta, String definitionId, KettleJobTriggerType triggerType, OpenApiController.CallBack back) {

        this.transMeta = transMeta;
        this.definitionId = definitionId;
        this.triggerType = triggerType;
        this.callBack = back;

        TransExecutionConfiguration executionConfiguration = new TransExecutionConfiguration();
        executionConfiguration.setVariables(arg);
        this.executionConfiguration = executionConfiguration;
        this.definition = InitiallyUtils.markIgnore(() -> dataFlowDefinitionService.getById(definitionId));
        TaskPeriodUnit cornPeriod ;
        if(StringUtils.isEmpty(getCron())){
            //默认使用minute的时间间隔
            cornPeriod = TaskPeriodUnit.minute;
        }else{
            cornPeriod = CronUtils.getCornPeriod(getCron());
        }
        this.retryIntervalGenerator = new RetryIntervalGenerator(((Integer) (cornPeriod.getMaxDelayTime() * 1000 / 2)).longValue(), 2, 3);
    }

    /**
     * 获取最终步骤的预览数据字段
     */
    public static List<List<String>> getPreviewData(String taskId) {
        TransPreview transPreview = previewData.getIfPresent(taskId);
        if (transPreview == null) {
            return Collections.emptyList();
        }
        return transPreview.getLastStepData().stream().map(Arrays::asList).collect(Collectors.toList());
    }

    /**
     * 获取指定步骤的预览数据
     */
    public static List<List<String>> getPreviewData(String taskId, String stepName) {
        TransPreview transPreview = previewData.getIfPresent(taskId);
        if (transPreview == null) {
            return Collections.emptyList();
        }
        return transPreview.getData(stepName).stream().map(Arrays::asList).collect(Collectors.toList());
    }

    /**
     * 获取任务最终的预览数据字段
     */
    public static List<String> getPreviewDataFields(String taskId) {
        TransPreview transPreview = previewData.getIfPresent(taskId);
        if (transPreview == null) {
            return Collections.emptyList();
        }
        List<String> stepDataFiled = transPreview.getLastFieldNames();
        return stepDataFiled;
    }

    /**
     * 获取指定步骤的预览数据字段
     */
    public static List<String> getPreviewDataFields(String taskId, String stepName) {
        TransPreview transPreview = previewData.getIfPresent(taskId);
        if (transPreview != null) {
            return Collections.emptyList();
        }
        List<String> stepDataFiled = transPreview.getFieldNames(stepName);
        return stepDataFiled;
    }

    public static FlowExecutor createFlowExecutor(Map<String, String> arg, TransMeta transMeta, String definitionId, KettleJobTriggerType triggerType, OpenApiController.CallBack back) {
        return new FlowExecutor(arg, transMeta, definitionId, triggerType, back);
    }

    /**
     * 获得任务运行生成的ExecutionId
     */
    public String getExecutionId() {
        String value;
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            try {
                boolean flage = true;
                long start = System.currentTimeMillis();
                while (flage) {
                    if (StringUtils.isNotEmpty(executionId)) {
                        flage = false;
                    }
                    if (System.currentTimeMillis() - start > 10 * 1000) {
                        break;
                    }
                    Thread.sleep(100);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return executionId;
        });
        try {
            value = cf.get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            cf.cancel(true);
            log.error("异步获取任务ExecutionId失败{}", e);
            throw new DataFactoryException(BOOT_FAILED);
        }

        return value;
    }

    /**
     * 获取当前流程实例的实时日志
     */
    public String getExecutionLog() {

        StringBuilder sb = new StringBuilder();
        KettleLogLayout logLayout = new KettleLogLayout(true);
        List<String> childIds = LoggingRegistry.getInstance().getLogChannelChildren(trans.getLogChannelId());
        List<KettleLoggingEvent> logLines = KettleLogStore.getLogBufferFromTo(childIds, true, -1, KettleLogStore.getLastBufferLineNr());
        for (KettleLoggingEvent event : logLines) {
            String line = logLayout.format(event).trim();
            sb.append(line).append("\n");
        }
        return sb.toString();
    }


    /**
     * 获取每一个步骤运行的状态 在运行 已完成 已停止  需要处理节点并发时候的日志合并问题
     */
    public List<Map<String, String>> getStepStatus() {
        List<Map<String, String>> status = new ArrayList<>();
        Map<String, Integer> step = new HashMap<>();
        for (int i = 0; i < trans.nrSteps(); i++) {
            StepInterface baseStep = trans.getRunThread(i);
            StepStatus stepStatus = new StepStatus(baseStep);
            log.info("{}步骤状态：{}", baseStep.getStepname(), stepStatus.getStatusDescription());
            if (step.keySet().contains(stepStatus.getStepname())) {
                if (KettleTaskStatus.getLevelByStatus(KettleTaskStatus.getTypeByLable(stepStatus.getStatusDescription())) == -1) {
                    step.put(stepStatus.getStepname(), KettleTaskStatus.FAIL.getLevel());
                }
                if (step.get(stepStatus.getStepname()) < KettleTaskStatus.getLevelByStatus(KettleTaskStatus.getTypeByLable(stepStatus.getStatusDescription()))) {
                    step.put(stepStatus.getStepname(), KettleTaskStatus.getLevelByStatus(KettleTaskStatus.getTypeByLable(stepStatus.getStatusDescription())));
                }
            } else {
                if (KettleTaskStatus.getLevelByStatus(KettleTaskStatus.getTypeByLable(stepStatus.getStatusDescription())) == -1) {
                    step.put(stepStatus.getStepname(), KettleTaskStatus.FAIL.getLevel());
                }
                step.put(stepStatus.getStepname(), KettleTaskStatus.getLevelByStatus(KettleTaskStatus.getTypeByLable(stepStatus.getStatusDescription())));
            }
        }

        for (String key : step.keySet()) {
            if (key.endsWith("-默认排序")) {
                continue;
            }
            Map<String, String> s = new HashMap<>();
            s.put("stepName", key);
            s.put("status", KettleTaskStatus.getTypeByLevel(step.get(key)));
            status.add(s);
        }

        return status;
    }

    public void holdExecutor() {
        executors.put(executionId, this);
    }

    public static Cache<String, FlowExecutor> getExecutors() {
        return executors;
    }

    @Override
    protected void runJob() {
        if (canRetry() || definition.getRetryRun() == 0) {
            DataFlowTask taskLog = new DataFlowTask();
            taskLog.setStartTime(LocalDateTime.now().minusSeconds(1));
            LoginUserManager loginUserManager = ApplicationUtils.getBean(LoginUserManager.class);
            //sql转换
            buildSql(definition);
            LoginVO loginVO = new LoginVO();
            loginVO.setTenantId(definition.getTenantId());
            loginUserManager.executeWithAssignedLoginUser(loginVO, () -> {
                try {
                    taskLog.setDefinitionId(Long.parseLong(definitionId));
                    taskLog.setStatus(KettleJobStatus.RUNNING.getCode());
                    taskLog.setTriggerMethod(triggerType.getCode());
                    taskLog.setKettleBatchId("");
                    dataFlowTaskService.save(taskLog);
                    transMeta.injectVariables(executionConfiguration.getVariables());
                    trans = new Trans(transMeta);
                    trans.addTransListener(new CustomTransListener());
                    trans.prepareExecution(null);
                    if (trans.isReadyToStart()) {
                        executionId = String.valueOf(trans.getBatchId());
                        log.info("executionId:{}", executionId);
                        taskLog.setKettleBatchId(executionId);
                        dataFlowTaskService.updateById(taskLog);//将executionId更新到数据库中,不然在运行过程中获取不到日志。
                        trans.startThreads();
                        runningFlow.add(transMeta.getName().split("#")[0]);
                        holdExecutor();//暂存当前执行器
                        trans.waitUntilFinished();
                        taskLog.setStatus(KettleJobStatus.FINISHED.getCode());
                    }
                } catch (Exception e) {
                    log.error("流程运行异常：{}", e);
                    taskLog.setStatus(KettleJobStatus.FAILED.getCode());
                    //TODO trans 启动失败或异常情况，trans不会通过自身的日志管道输出。需要记录到task中
                } finally {
                    runningFlow.remove(transMeta.getName().split("#")[0]);
                    taskLog.setEndTime(LocalDateTime.now());

                    if (!noError) {
                        taskLog.setStatus(KettleJobStatus.FAILED.getCode());
                        //只对流程在运行中产生的异常进行重试
                        goToRetry();
                    }
                    dataFlowTaskService.updateById(taskLog);

                    //更新增量标识
                    if (taskLog.getStatus() != KettleJobStatus.FAILED.getCode() && "YES".equals(definition.getIsIncre())) {
                        dataFlowDefinitionService.updateById(definition);
                    }

                    //API 回调
                    callBack(taskLog, null);
                }
                return null;
            });
        }
    }

    private boolean canRetry() {
        if (definition.getRetryRun() == 1) {
            Integer count = retryMap.compute(definitionId, (k, v) -> (v == null) ? 1 : v + 1);
            if(count ==MAX_RETRY_TIMES + 1){
                retryMap.remove(definitionId);
            }
            return true;
        }
        return false;
    }

    @Override
    public boolean jobIsRunning() {
        if (runningFlow.contains(transMeta.getName().split("#")[0])) {
            DataFlowTask task = new DataFlowTask();
            task.setStatus(KettleJobStatus.FAILED.getCode());
            callBack(task, JOB_IS_RUNNING.getMessage());
            throw new DataFactoryException(JOB_IS_RUNNING);
        }
        return false;
    }

    @Override
    TaskPeriodUnit getTaskPeriodUnit() {
        return CronUtils.getCornPeriod(getCron());
    }

    private void callBack(DataFlowTask taskLog, String message) {
        if (callBack != null) {
            try {
                String backUrl = callBack.getUrl();
                OkHttpClient client = new OkHttpClient().newBuilder()
                        .build();
                MediaType mediaType = MediaType.parse("application/json");
                RequestBody body = RequestBody.create(mediaType, "{\"status\": \"" + KettleJobStatus.getByCode(taskLog.getStatus()).name() + "\",\"message\": \"" + message == null ? KettleJobStatus.getByCode(taskLog.getStatus()).getDesc() : message + "\"}");
                Request request = new Request.Builder()
                        .url(backUrl)
                        .method("POST", body)
                        .addHeader("token", ApiRequest.getInstance().refreshToken())
                        .addHeader("Content-Type", "application/json")
                        .build();
                client.newCall(request).execute();
            } catch (IOException e) {
                log.error("回调失败：{}", callBack);
            }
        }
    }

    private void buildSql(DataFlowDefinition definition) {
        String incrEndValue;
        //构建增量SQL
        if ("YES".equals(definition.getIsIncre())) {
            StepMeta stepMeta = transMeta.getTransHops().get(0).getFromStep();
            if (stepMeta.getTypeId().equals(KettleDataFlowNodeType.TableInput.name())) {
                TableInputMeta tableInputMeta = (TableInputMeta) stepMeta.getStepMetaInterface();

                DataSourceDTO dataSourceDTO = dataSourceService.getDataSourceMarkIgnore(Long.parseLong(tableInputMeta.getDatabaseMeta().getName().split("-")[0]));
                String table = definition.getIncrTable().contains(".") ? definition.getIncrTable().split("\\.")[1].replaceAll("\"", "").replaceAll("`", "") : definition.getIncrTable().replaceAll("\"", "").replaceAll("`", "");
                dataSourceDTO.setTableName(table);
                dataSourceDTO.setSelectDatabaseName(tableInputMeta.getDataBaseName());
                ResultVO tableStruct = databaseManager.getTableStruct(dataSourceDTO);
                Map<String, String> filedTypes = tableStruct.getContent().stream().map(m -> new Tuple2<>(String.valueOf(m.get("columnName")), String.valueOf(m.get("dataType")))).collect(Collectors.toMap(Tuple2::getFirst, Tuple2::getSecond));
                AbstractDatabase db = DatabaseManager.findDb(dataSourceDTO);

                //获得增量字段的最大值
                incrEndValue = String.valueOf(databaseManager.maxValue(dataSourceDTO, databaseManager.transFieldFormat(dataSourceDTO, definition.getIncreField(), removeTypeSpec(filedTypes.get(definition.getIncreField())))));
                //第一次
                boolean first = false;
                if (StringUtils.isEmpty(definition.getIncreValue())) {
                    first = true;
                    definition.setIncreValue(String.valueOf(databaseManager.minValue(dataSourceDTO, databaseManager.transFieldFormat(dataSourceDTO, definition.getIncreField(), removeTypeSpec(filedTypes.get(definition.getIncreField()))))));
                }
                String sql = tableInputMeta.getSQL();
                //增量SQL构建
                String incrSql = new IncrSqlBuilder().generateIncrSql(sql, definition.getIncrTable(), definition.getIncreFieldType(),
                        databaseManager.transFieldFormat(dataSourceDTO, definition.getIncreField(), removeTypeSpec(filedTypes.get(definition.getIncreField()))),
                        definition.getIncreField(), definition.getIncreValue(), incrEndValue, first, db, removeTypeSpec(filedTypes.get(definition.getIncreField())));
                tableInputMeta.setSQL(incrSql);

                definition.setIncreValue(incrEndValue);
            }
        }
    }

    public void stopAndRemove() {
        trans.stopAll();
        executors.invalidate(this);
        runningFlow.remove(transMeta.getName().split("#")[0]);
    }

    public static Set<String> getRunningFlow() {
        return runningFlow;
    }

    private String removeTypeSpec(String type) {
        String REGEX_ALL_BRACKETS = "\\<.*?\\>|\\(.*?\\)|\\（.*?\\）|\\[.*?\\]|\\【.*?\\】|\\{.*?\\}";

        return type.replaceAll(REGEX_ALL_BRACKETS, "");
    }

    private void goToRetry() {
        if (definition.getRetryRun()==1 && retryIntervalGenerator.hasNext()) {
            delayReTryQueue.add(new DelayTask(retryIntervalGenerator.next(), this));
        }
    }

    private String getCron() {
        if(definition.getScheduleInterval()==null || definition.getScheduleIntervalTimeUnit()==null || definition.getScheduleStartTime()==null){
            return null;
        }
        return TimeUtil.getCronExpression(definition.getScheduleInterval(), definition.getScheduleIntervalTimeUnit(), definition.getScheduleStartTime());
    }

    private static void startDelayedTaskProcessor() {
        Thread t = new Thread(() -> {
            try {
                while (true) {
                    DelayTask<FlowExecutor> delayedTask = delayReTryQueue.take();
                    delayedTask.getTask().execute(true);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        t.setName("DelayReTryTaskCustomer");
        t.start();
    }


    class CustomTransListener implements TransListener {

        @Override
        public void transStarted(Trans trans) {

        }

        @Override
        public void transActive(Trans trans) {

        }

        @Override
        public void transFinished(Trans trans) {
            // 此处可以检查任务是否产生异常，trans.getErrors() > 0 表示有异常
            if (trans.getErrors() > 0) {
                noError=false;
            }
        }
    }


}
